fix: async engine side-effect column propagation and collision resolution#509
Conversation
…tion ExecutionGraph.set_side_effect() now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. This prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages. AsyncTaskScheduler now includes side-effect columns in _instance_to_columns so their values are written to the RowGroupBufferManager and available to downstream prompt templates. Fixes #508
PR #509 Review: fix: async engine side-effect column propagation and collision resolutionSummaryThis PR fixes two bugs in the async engine's handling of
Files changed: 3 (37 additions, 3 deletions) FindingsCorrectness
Observations
TestingThe new test Gap: There is no unit test for the Style / Conventions
VerdictLGTM. This is a focused, well-reasoned bug fix. Both changes are correct and consistent with each other. The test coverage for |
Greptile SummaryThis PR fixes two bugs in the async engine's handling of
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Splits _instance_to_columns into two dicts: _gen_instance_to_columns (completion tracking, no side-effect columns) and _gen_instance_to_columns_including_side_effects (buffer writes, includes side-effect columns); all three run methods correctly use the new write-only dict. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py | set_side_effect() now raises ConfigCompilationError on collision from a different producer (idempotent for same producer), preventing silent last-writer-wins overwrite that caused false DAGCircularDependencyError. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py | Adds early duplicate-side-effect-producer detection in topologically_sort_column_configs(), consistent with execution_graph.py validation; raises ConfigCompilationError with a clear message. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | New test verifies that side-effect columns appear in the buffer-write dict but not in the completion-tracking dict, directly exercising the invariant that prevents the CompletionTracker KeyError. |
| packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py | New test exercises the duplicate-side-effect-producer detection in topologically_sort_column_configs() using real custom column generators. |
| packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py | New test confirms set_side_effect() raises ConfigCompilationError when two distinct producers register the same side-effect column name. |
Sequence Diagram
sequenceDiagram
participant S as AsyncTaskScheduler
participant R as _run_from_scratch / _run_cell / _run_batch
participant BM as RowGroupBufferManager
participant CT as CompletionTracker
Note over S: __init__: build two dicts
S->>S: _gen_instance_to_columns (real columns only)
S->>S: _gen_instance_to_columns_including_side_effects (real + side-effect columns)
S->>R: execute task (task.column)
R->>R: generator.agenerate(...)
R->>BM: update_batch / update_cell using _gen_instance_to_columns_including_side_effects
S->>CT: mark_row_range_complete / mark_cell_complete using _gen_instance_to_columns (NO side-effect columns)
Reviews (7): Last reviewed commit: "fix: reject duplicate side-effect produc..." | Re-trigger Greptile
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Show resolved
Hide resolved
…cheduler Side-effect columns added to _instance_to_columns caused KeyError in CompletionTracker._validate_strategy() because they are not registered in the execution graph. Split into _instance_to_write_columns (buffer writes, includes side-effects) and _instance_to_columns (completion tracking, real columns only).
| First-writer-wins: if a side-effect column is already mapped to a | ||
| different producer, the earlier mapping is kept. This matches sync | ||
| engine semantics where earlier consumers see the first producer's | ||
| value. | ||
| """ | ||
| if side_effect_col not in self._side_effect_map: | ||
| self._side_effect_map[side_effect_col] = producer |
There was a problem hiding this comment.
Isn't a side effect column 1:1 with its producer? In which scenario would multiple producers point to the same side effect column?
There was a problem hiding this comment.
You're right that in a correctly configured pipeline it should be 1:1. The collision scenario comes from Anonymizer's detection workflow - it has branching pipeline paths where different stages declare the same side-effect columns (e.g. _merged_tagged_text is declared by both prepare_validation_inputs and merge_and_build_candidates), but only one path is active per run. With last-writer-wins the graph resolves the wrong producer and creates a false cycle.
That said, this is an unusual pattern and silently ignoring the second registration isn't great. I'll add a warning log on collision so misconfigurations don't go unnoticed.
There was a problem hiding this comment.
in this example wouldn't the side effect columns be prepare_validation_inputs _merged_tagged_text and merge_and_build_candidates_merged_tagged_text? It's still 1:1 right?
There was a problem hiding this comment.
I looked into this - the collision appears to be real. Side-effect column names are bare strings with no namespacing. In Anonymizer's detection workflow (custom_columns.py), both merge_and_build_candidates and prepare_validation_inputs declare the exact same side-effect columns:
# merge_and_build_candidates (line 79)
side_effect_columns=[COL_MERGED_TAGGED_TEXT, COL_VALIDATION_CANDIDATES]
# prepare_validation_inputs (line 125)
side_effect_columns=[COL_MERGED_TAGGED_TEXT, COL_VALIDATION_CANDIDATES]Both resolve to the same literal strings ("_merged_tagged_text", "_validation_candidates"). As far as I can tell, this is intentional - the two generators are alternative producers for the same downstream consumer depending on which pipeline path runs.
I considered whether namespacing (e.g. producer_name._merged_tagged_text) could solve this, but the downstream column config wouldn't know which producer ran, so it can't pick the right prefixed name. You'd likely need a union/alias mechanism in the DAG, which feels like a lot of complexity for a pattern that already works. The @custom_column_generator API is also public, so naming changes could break external plugins.
Open to other ideas, but first-writer-wins + warning seems like the right pragmatic fix here.
There was a problem hiding this comment.
After our offline discussion, I agree with you - multiple producers for the same side-effect column is an anti-pattern. The Anonymizer case is actually an overwrite pattern within the same pipeline (not alternative paths), which works in the sync engine by accident.
I've updated set_side_effect() to raise ConfigCompilationError on duplicate producers instead of silently picking a winner (604aa21). We'll follow up on the Anonymizer side with distinct column names per stage.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Outdated
Show resolved
Hide resolved
Log a warning when multiple producers register the same side-effect column (first-writer-wins still applies). Rename _instance_to_columns and _instance_to_write_columns per review feedback for clarity.
PR #509 Review: fix: async engine side-effect column propagation and collision resolutionSummaryThis PR fixes two bugs in the async engine's handling of
Files changed: 4 (+117/−19) FindingsCorrectness
Two-loop pattern (lines 139-150): The code correctly uses two separate loops over Usage-site audit —
Minor Observations
Testing
Gap: No E2E test exercising the full async pipeline with side-effect columns end-to-end (acknowledged in PR description). Given the two unit tests cover the specific root causes and the full test suite passes (1747 engine tests), this is acceptable for merge. A follow-up E2E test would strengthen confidence for the Anonymizer use case. Style / Conventions
VerdictLGTM. This is a focused, well-reasoned bug fix. Both changes are correct and mutually consistent. The dual-map design in the scheduler cleanly separates completion tracking from buffer writes without disrupting existing async semantics. The first-writer-wins policy in the graph faithfully reproduces sync engine behavior. Test coverage is solid for both fixes. Minor style nits (verbose set comprehension, long line) are non-blocking. Ship it. |
Replace first-writer-wins collision handling with a hard error. Each side-effect column must have exactly one producer; duplicates are a configuration issue to be fixed at the source.
Mirror the async path check: raise ConfigCompilationError when two custom columns declare the same side-effect column name during topological sort.
Summary
Fix two bugs in the async engine's handling of
@custom_column_generatorside-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running withDATA_DESIGNER_ASYNC_ENGINE=1.Related Issue
Fixes #508
Changes
ExecutionGraph.set_side_effect()now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. Prevents falseDAGCircularDependencyErrorwhen multiple generators declare the same side-effect column at different pipeline stages.AsyncTaskSchedulernow includes side-effect columns in_instance_to_columnsso their values are written to theRowGroupBufferManagerand available to downstream prompt templates.Testing
make testpasses (1747 engine tests)